home *** CD-ROM | disk | FTP | other *** search
/ AmigActive 23 / AACD 23.iso / AACD / Online / opennap / buffer.c < prev    next >
C/C++ Source or Header  |  2001-06-08  |  12KB  |  492 lines

  1. /* Copyright (C) 2000-1 drscholl@users.sourceforge.net
  2.    This is free software distributed under the terms of the
  3.    GNU Public License.  See the file COPYING for details.
  4.  
  5.    $Id: buffer.c,v 1.65 2001/02/15 08:39:45 drscholl Exp $ */
  6.  
  7. #ifndef WIN32
  8. #include <unistd.h>
  9. #include <sys/time.h>
  10. #else
  11. #include <windows.h>
  12. #endif /* !WIN32 */
  13. #include <string.h>
  14. #include <errno.h>
  15. #include <stdlib.h>
  16. #include "opennap.h"
  17. #include "debug.h"
  18.  
  19. static BUFFER *
  20. buffer_new (void)
  21. {
  22.     BUFFER *r = CALLOC (1, sizeof (BUFFER));
  23.  
  24.     if (!r)
  25.     {
  26.     OUTOFMEMORY ("buffer_new");
  27.     return 0;
  28.     }
  29. #if DEBUG
  30.     r->magic = MAGIC_BUFFER;
  31. #endif
  32.     r->data = MALLOC (BUFFER_SIZE);
  33.     if (!r->data)
  34.     {
  35.     OUTOFMEMORY ("buffer_new");
  36.     FREE (r);
  37.     return 0;
  38.     }
  39.     r->datamax = BUFFER_SIZE;
  40.     return r;
  41. }
  42.  
  43. /* append bytes to the buffer */
  44. static BUFFER *
  45. buffer_queue (BUFFER * b, char *d, int dsize)
  46. {
  47.     BUFFER *r = b;
  48.     int     count;
  49.  
  50.     if (b)
  51.     while (b->next)
  52.         b = b->next;
  53.     while (dsize > 0)
  54.     {
  55.     if (!b)
  56.         r = b = buffer_new ();
  57.     else if (b->datasize == b->datamax)
  58.     {
  59.         b->next = buffer_new ();
  60.         b = b->next;
  61.     }
  62.     if (!b)
  63.     {
  64.         /*something really bad just happened!  no choice but to close
  65.            this connection since it will be out of sync */
  66.         buffer_free (r);
  67.         return 0;
  68.     }
  69.     count = dsize;
  70.     /* dsize could be greater than what is allocated */
  71.     if (count > b->datamax - b->datasize)
  72.         count = b->datamax - b->datasize;
  73.     memcpy (b->data + b->datasize, d, count);
  74.     b->datasize += count;
  75.     dsize -= count;
  76.     d += count;
  77.     }
  78.     return r;
  79. }
  80.  
  81. /* consume some bytes from the buffer */
  82. BUFFER *
  83. buffer_consume (BUFFER * b, int n)
  84. {
  85.     ASSERT (buffer_validate (b));
  86.     ASSERT (b->consumed + n <= b->datasize);
  87.     b->consumed += n;
  88.     if (b->consumed >= b->datasize)
  89.     {
  90.     BUFFER *p = b;
  91.  
  92.     b = b->next;
  93.     FREE (p->data);
  94.     FREE (p);
  95.     }
  96.     return b;
  97. }
  98.  
  99. BUFFER *
  100. buffer_append (BUFFER * a, BUFFER * b)
  101. {
  102.     BUFFER *r = a;
  103.  
  104.     ASSERT (b != 0);
  105.     if (!a)
  106.     return b;
  107.     ASSERT (buffer_validate (a));
  108.     while (a->next)
  109.     a = a->next;
  110.     a->next = b;
  111.     return r;
  112. }
  113.  
  114. int
  115. buffer_size (BUFFER * b)
  116. {
  117.     int     n = 0;
  118.  
  119.     ASSERT (b == 0 || buffer_validate (b));
  120.     for (; b; b = b->next)
  121.     n += b->datasize - b->consumed;
  122.     return n;
  123. }
  124.  
  125. void
  126. buffer_free (BUFFER * b)
  127. {
  128.     BUFFER *p;
  129.  
  130.     ASSERT (b == 0 || buffer_validate (b));
  131.     while (b)
  132.     {
  133.     p = b;
  134.     b = b->next;
  135.     FREE (p->data);
  136.     FREE (p);
  137.     }
  138. }
  139.  
  140. #if DEBUG
  141. int
  142. buffer_validate (BUFFER * b)
  143. {
  144.     ASSERT_RETURN_IF_FAIL (VALID_LEN (b, sizeof (BUFFER)), 0);
  145.     ASSERT_RETURN_IF_FAIL (b->magic == MAGIC_BUFFER, 0);
  146.     ASSERT_RETURN_IF_FAIL (b->datasize <= b->datamax, 0);
  147.     ASSERT_RETURN_IF_FAIL (b->data == 0
  148.                || VALID_LEN (b->data, b->datasize), 0);
  149.     ASSERT_RETURN_IF_FAIL (b->consumed == 0 || b->consumed < b->datasize, 0);
  150.     ASSERT_RETURN_IF_FAIL (b->next == 0
  151.                || VALID_LEN (b->next, sizeof (BUFFER *)), 0);
  152.     return 1;
  153. }
  154. #endif /* DEBUG */
  155.  
  156. static BUFFER *
  157. buffer_compress (z_streamp zip, BUFFER ** b)
  158. {
  159.     BUFFER *r = 0, **pr;
  160.     int     n, bytes, flush;
  161.  
  162.     ASSERT (buffer_validate (*b));
  163.  
  164.     /* set up the input */
  165.     bytes = (*b)->datasize - (*b)->consumed;
  166.     zip->next_in = (u_char *) (*b)->data + (*b)->consumed;
  167.     zip->avail_in = bytes;
  168.     /* force a flush if this is the last input to compress */
  169.     flush = ((*b)->next == 0) ? Z_SYNC_FLUSH : Z_NO_FLUSH;
  170.     /* set to 0 so we allocate in the loop */
  171.     zip->avail_out = 0;
  172.  
  173.     pr = &r;
  174.  
  175.     do
  176.     {
  177.     if (zip->avail_out == 0)
  178.     {
  179.         /* allocate a new buffer to hold the rest of the compressed data */
  180.         *pr = buffer_new ();
  181.         if (!*pr)
  182.         break;
  183.         /* mark the buffer as completely full then remove unused data
  184.            when we exit this loop */
  185.         (*pr)->datasize = (*pr)->datamax;
  186.         zip->next_out = (unsigned char *) (*pr)->data;
  187.         zip->avail_out = (*pr)->datasize;
  188.     }
  189.     n = deflate (zip, flush);
  190.     if (n != Z_OK)
  191.     {
  192.         log ("buffer_compress: deflate: %s (error %d)",
  193.          NONULL (zip->msg), n);
  194.         break;
  195.     }
  196.     pr = &(*pr)->next;
  197.     }
  198.     while (zip->avail_out == 0 && flush == Z_SYNC_FLUSH);
  199.  
  200.     /* subtract any uncompressed bytes */
  201.     bytes -= zip->avail_in;
  202.     *b = buffer_consume (*b, bytes);
  203.  
  204.     if (r)
  205.     {
  206.     pr = &r;
  207.     while ((*pr)->next)
  208.         pr = &(*pr)->next;
  209.     (*pr)->datasize -= zip->avail_out;
  210.     /* this should only happen for the first created buffer if the
  211.        input was small and there was a second buffer in the list */
  212.     if ((*pr)->datasize == 0)
  213.     {
  214.         ASSERT (r->next == 0);
  215.         if (r->next != 0)
  216.         log ("buffer_compress: ERROR! r->next was not NULL");
  217.         FREE (r->data);
  218.         FREE (r);
  219.         r = 0;
  220.     }
  221.     }
  222.  
  223.     return r;
  224. }
  225.  
  226. /* assuming that we receive relatively short blocks via the network (less
  227.    than 16kb), we uncompress all data when we receive it and don't worry
  228.    about blocking.
  229.  
  230.    NOTE: this is the only buffer_*() function that does not use the memory
  231.    pool.  each server gets its own real input buffer */
  232. int
  233. buffer_decompress (BUFFER * b, z_streamp zip, char *in, int insize)
  234. {
  235.     int     n;
  236.  
  237.     ASSERT (buffer_validate (b));
  238.     ASSERT (insize > 0);
  239.     zip->next_in = (unsigned char *) in;
  240.     zip->avail_in = insize;
  241.     zip->next_out = (unsigned char *) b->data + b->datasize;
  242.     zip->avail_out = b->datamax - b->datasize;
  243.     /* set this to the max size and subtract what is left after the inflate */
  244.     b->datasize = b->datamax;
  245.     do
  246.     {
  247.     /* if there is no more output space left, create some more */
  248.     if (zip->avail_out == 0)
  249.     {
  250.         /* allocate one extra byte to write a \0 char */
  251.         if (safe_realloc ((void **) &b->data, b->datamax + 2049))
  252.         {
  253.         OUTOFMEMORY ("buffer_decompress");
  254.         return -1;
  255.         }
  256.         b->datamax += 2048;
  257.         zip->next_out = (unsigned char *) b->data + b->datasize;
  258.         zip->avail_out = b->datamax - b->datasize;
  259.         /* set this to the max size and subtract what is left after the
  260.            inflate */
  261.         b->datasize = b->datamax;
  262.     }
  263.     n = inflate (zip, Z_SYNC_FLUSH);
  264.     if (n != Z_OK)
  265.     {
  266.         log ("buffer_decompress: inflate: %s (error %d)",
  267.          NONULL (zip->msg), n);
  268.         return -1;
  269.     }
  270.     }
  271.     while (zip->avail_out == 0);
  272.     /* subtract unused bytes */
  273.     b->datasize -= zip->avail_out;
  274.     return 0;
  275. }
  276.  
  277. void
  278. init_compress (CONNECTION * con, int level)
  279. {
  280.     int     n;
  281.  
  282.     ASSERT (validate_connection (con));
  283.     ASSERT (ISSERVER (con));
  284.     con->sopt->zin = CALLOC (1, sizeof (z_stream));
  285.     if (!con->sopt->zin)
  286.     {
  287.     OUTOFMEMORY ("init_compress");
  288.     return;
  289.     }
  290.     con->sopt->zout = CALLOC (1, sizeof (z_stream));
  291.     if (!con->sopt->zout)
  292.     {
  293.     FREE (con->sopt->zin);
  294.     OUTOFMEMORY ("init_compress");
  295.     return;
  296.     }
  297.  
  298.     n = inflateInit (con->sopt->zin);
  299.     if (n != Z_OK)
  300.     {
  301.     log ("init_compress: inflateInit: %s (%d)",
  302.          NONULL (con->sopt->zin->msg), n);
  303.     }
  304.     n = deflateInit (con->sopt->zout, level);
  305.     if (n != Z_OK)
  306.     {
  307.     log ("init_compress: deflateInit: %s (%d)",
  308.          NONULL (con->sopt->zout->msg), n);
  309.     }
  310.  
  311.     log ("init_compress: compressing server stream at level %d", level);
  312. }
  313.  
  314. void
  315. finalize_compress (SERVER * serv)
  316. {
  317.     int     n;
  318.  
  319.     n = deflateEnd (serv->zout);
  320.     if (n != Z_OK)
  321.     log ("finalize_compress: deflateEnd: %s (%d)",
  322.          NONULL (serv->zout->msg), n);
  323.     n = inflateEnd (serv->zin);
  324.     if (n != Z_OK)
  325.     log ("finalize_compress: inflateEnd: %s (%d)",
  326.          NONULL (serv->zin->msg), n);
  327.     FREE (serv->zin);
  328.     FREE (serv->zout);
  329. }
  330.  
  331. int
  332. send_queued_data (CONNECTION * con)
  333. {
  334.     int     n;
  335.     BUFFER *r;
  336.  
  337.     ASSERT (validate_connection (con));
  338.  
  339.     if (con->destroy)
  340.     {
  341.     /* connection is being shut down, just ignore it */
  342.     clear_write (con->fd);    /* just to be sure */
  343.     return -1;
  344.     }
  345.  
  346.     if (ISSERVER (con))
  347.     {
  348.     /* compress server output until we have at least 16k waiting (about
  349.      * the size of the tcp buffer for the socket)
  350.      */
  351.     while (con->sopt->outbuf && buffer_size (con->sendbuf) < 16384)
  352.     {
  353.         /* buffer_compress will only compress the first buffer in the
  354.          * list, so we possibly need to call it multiple times.
  355.          */
  356.         r = buffer_compress (con->sopt->zout, &con->sopt->outbuf);
  357.         if (!r)
  358.         break;
  359.         con->sendbuf = buffer_append (con->sendbuf, r);
  360.     }
  361.  
  362.     /* for large networks, it might be desirable not to send data every
  363.      * time through the main loop.  this adds support for queuing up
  364.      * a larger amount of data before actually doing a write()
  365.      */
  366.     if (Server_Chunk > 0)
  367.     {
  368.         /* check to see if enough data has been accumulated to send */
  369.         if (buffer_size (con->sendbuf) < Server_Chunk)
  370.         {
  371.         clear_write (con->fd);    /* turn off check for write */
  372.         return 0;    /* wait until more data is recv'd */
  373.         }
  374.     }
  375.     }
  376.  
  377.     /* write until the queue is consumed, or we would block */
  378.     while (con->sendbuf)
  379.     {
  380.     n = WRITE (con->fd, con->sendbuf->data + con->sendbuf->consumed,
  381.            con->sendbuf->datasize - con->sendbuf->consumed);
  382.     if (n == -1)
  383.     {
  384.         if (N_ERRNO != EWOULDBLOCK && N_ERRNO != EDEADLK &&
  385.             N_ERRNO != ENOBUFS)
  386.         {
  387.         clear_write (con->fd);    /* just to be sure */
  388.         log ("send_queued_data: write: %s (errno %d) for host %s",
  389.              strerror (N_ERRNO), N_ERRNO, con->host);
  390.         return -1;
  391.         }
  392.         break;
  393.     }
  394.     else if (n == 0)
  395.     {
  396.         log ("send_queued_data: wrote 0 bytes to fd %d", con->fd);
  397.         break;
  398.     }
  399.  
  400.     /* mark data as written */
  401.     con->sendbuf = buffer_consume (con->sendbuf, n);
  402.  
  403.     /* keep track of the outgoing bandwidth */
  404.     global.bytes_out += n;
  405.     }
  406.  
  407.     /* check to make sure the queue hasn't gotten too big */
  408.  
  409.     if (ISSERVER (con))
  410.     {
  411.     /* for a server, we will have up to 16k in con->sendbuf, and then
  412.      * possibly a lot more in con->sopt->outbuf.  the latter
  413.      * is uncompressed data.
  414.      */
  415.     if (buffer_size (con->sopt->outbuf) > Server_Queue_Length)
  416.     {
  417.         log ("send_queued_data: output buffer for %s exceeded %u bytes",
  418.          con->host, Server_Queue_Length);
  419.         return -1;
  420.     }
  421.     }
  422.     else if (buffer_size (con->sendbuf) > Client_Queue_Length)
  423.     {
  424.     log ("send_queued_data: output buffer for %s exceeded %u bytes",
  425.          con->host, Client_Queue_Length);
  426.     return -1;
  427.     }
  428.  
  429.     if (con->sendbuf || (ISSERVER (con) && con->sopt->outbuf))
  430.     /* still need to write */
  431.     set_write (con->fd);
  432.     else
  433.     /* output queue is empty, clear the write bit */
  434.     clear_write (con->fd);
  435.  
  436.     return 0;
  437. }
  438.  
  439. void
  440. queue_data (CONNECTION * con, char *s, int ssize)
  441. {
  442.     ASSERT (validate_connection (con));
  443.     if (ISSERVER (con))
  444.     {
  445.     /* always queue server data so we can compress it more effciently */
  446.     con->sopt->outbuf = buffer_queue (con->sopt->outbuf, s, ssize);
  447.     if (!con->sopt->outbuf)
  448.         destroy_connection (con);
  449.     /* server connections are always flushed at the end of the main
  450.      * event loop, so we don't need to call set_write() here
  451.      */
  452.     }
  453.     else if (!con->destroy)
  454.     {
  455.     /* if no output is queued, immediately attempt to send it now to
  456.      * avoid copying
  457.      */
  458.     if (!con->sendbuf)
  459.     {
  460. #ifndef ALWAYS_QUEUE
  461.         int     n = WRITE (con->fd, s, ssize);
  462.  
  463.         if (n == -1)
  464.         {
  465.         if (N_ERRNO != EWOULDBLOCK)
  466.         {
  467.             log ("queue_data: %s: write: %s (errno %d)",
  468.              con->host, strerror (N_ERRNO), N_ERRNO);
  469.             destroy_connection (con);
  470.             return;
  471.         }
  472.         /* queue the data */
  473.         }
  474.         else if (n == ssize)
  475.         return;        /* all written, nothing else to do */
  476.         else
  477.         {
  478.         /* queue the portion that didn't get written */
  479.         s += n;
  480.         ssize -= n;
  481.         }
  482. #endif /* ! ALWAYS_QUEUE */
  483.         /* we want to know when the socket becomes writable again */
  484.         set_write (con->fd);
  485.     }
  486.  
  487.     con->sendbuf = buffer_queue (con->sendbuf, s, ssize);
  488.     if (!con->sendbuf)
  489.         destroy_connection (con);
  490.     }
  491. }
  492.